package com.rabbit.icore;


import com.rabbit.consumer.AddConsumer;
import com.rabbit.consumer.ReduceConsumer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
public class MqConfig {

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;

    @Autowired
    private AddConsumer addConsumer;
    @Autowired
    private ReduceConsumer reduceConsumer;
    @Autowired
    private Queues queues;

    /**
     * 注入连接工厂
     * @return
     */
    @Bean
    public ConnectionFactory getConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        //该方法配置多个host，在当前连接host down掉的时候会自动去重连后面的host，没试过
        connectionFactory.setAddresses(host);

        connectionFactory.setPort(port);
        //开启确认模式
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        //开启退回模式
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    /**
     * 注入RabbitAdmin 创建队列用
     * @return
     */
    @Bean
    public RabbitAdmin getRabbitAdmin() {
        return new RabbitAdmin(getConnectionFactory());
    }

    /**
     * Add业务对应的监听容器，在spring配置文件中是一个容器可以指定多个监听器的，一个容器指定一个具体的监听器
     * @return
     */
    @Bean("Add_SimpleMessageListenerContainer")
    public SimpleMessageListenerContainer getAdd_SimpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnectionFactory());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(addConsumer);
        container.setConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        List<String> addQueueList = queues.getQueueArray().get("add");
        container.setQueueNames(addQueueList.toArray(new String[addQueueList.size()]));
        return container;
    }

    /**
     * Reduce业务对应的监听容器
     * @return
     */
    @Bean("Reduce_SimpleMessageListenerContainer")
    public SimpleMessageListenerContainer getReduce_SimpleMessageListenerContainer() {
        //建立监听容器
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnectionFactory());
        //设置手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //设置监听器
        container.setMessageListener(reduceConsumer);
        //当前监听器个数
        container.setConcurrentConsumers(1);
        //最大监听器个数
        container.setConcurrentConsumers(1);
        //获取reduce相关的初始的队列
        List<String> addQueueList = queues.getQueueArray().get("reduce");
        //设置初始监听的队列
        container.setQueueNames(addQueueList.toArray(new String[addQueueList.size()]));
        return container;
    }


    /**
     * RabbitTemplatePlus
     * @return
     */
    @Bean
//    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE,proxyMode = ScopedProxyMode.TARGET_CLASS)
    public RabbitTemplatePlus getRabbitTemplatePlus() {
        RabbitTemplatePlus rabbitTemplatePlus = new RabbitTemplatePlus();
        //设置连接池
        rabbitTemplatePlus.setConnectionFactory(getConnectionFactory());
        //设置手动确认消费
        rabbitTemplatePlus.setMandatory(true);
        //指定消息格式为json方便转化和防止乱码
        rabbitTemplatePlus.setMessageConverter(jsonMessageConverter());
        return rabbitTemplatePlus;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}
